/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.datanode;

import java.io.EOFException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.VersionUtil;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.collect.Maps;

import static org.apache.hadoop.util.Time.now;

/**
 * A thread per active or standby namenode to perform:
 * <ul>
 * <li> Pre-registration handshake with namenode</li>
 * <li> Registration with namenode</li>
 * <li> Send periodic heartbeats to the namenode</li>
 * <li> Handle commands received from the namenode</li>
 * </ul>
 */
@InterfaceAudience.Private
class BPServiceActor implements Runnable {

    static final Log LOG = DataNode.LOG;
    final InetSocketAddress nnAddr;
    HAServiceState state;

    final BPOfferService bpos;

    // lastBlockReport, lastDeletedReport and lastHeartbeat may be assigned/read
    // by testing threads (through BPServiceActor#triggerXXX), while also
    // assigned/read by the actor thread. Thus they should be declared as volatile
    // to make sure the "happens-before" consistency.
    volatile long lastBlockReport = 0;
    volatile long lastDeletedReport = 0;

    boolean resetBlockReportTime = true;

    volatile long lastCacheReport = 0;

    Thread bpThread;
    DatanodeProtocolClientSideTranslatorPB bpNamenode;
    private volatile long lastHeartbeat = 0;

    static enum RunningState {
        CONNECTING, INIT_FAILED, RUNNING, EXITED, FAILED;
    }

    private volatile RunningState runningState = RunningState.CONNECTING;

    /**
     * Between block reports (which happen on the order of once an hour) the
     * DN reports smaller incremental changes to its block list. This map,
     * keyed by block ID, contains the pending changes which have yet to be
     * reported to the NN. Access should be synchronized on this object.
     */
    private final Map<DatanodeStorage, PerStoragePendingIncrementalBR>
            pendingIncrementalBRperStorage = Maps.newHashMap();

    // IBR = Incremental Block Report. If this flag is set then an IBR will be
    // sent immediately by the actor thread without waiting for the IBR timer
    // to elapse.
    private volatile boolean sendImmediateIBR = false;
    private volatile boolean shouldServiceRun = true;
    private final DataNode dn;
    private final DNConf dnConf;

    private DatanodeRegistration bpRegistration;

    BPServiceActor(InetSocketAddress nnAddr, BPOfferService bpos) {
        this.bpos = bpos;
        this.dn = bpos.getDataNode();
        this.nnAddr = nnAddr;
        this.dnConf = dn.getDnConf();
    }

    boolean isAlive() {
        if (!shouldServiceRun || !bpThread.isAlive()) {
            return false;
        }
        return runningState == BPServiceActor.RunningState.RUNNING
                || runningState == BPServiceActor.RunningState.CONNECTING;
    }

    @Override
    public String toString() {
        return bpos.toString() + " service to " + nnAddr;
    }

    InetSocketAddress getNNSocketAddress() {
        return nnAddr;
    }

    /**
     * Used to inject a spy NN in the unit tests.
     */
    @VisibleForTesting
    void setNameNode(DatanodeProtocolClientSideTranslatorPB dnProtocol) {
        bpNamenode = dnProtocol;
    }

    @VisibleForTesting
    DatanodeProtocolClientSideTranslatorPB getNameNodeProxy() {
        return bpNamenode;
    }

    /**
     * Perform the first part of the handshake with the NameNode.
     * This calls <code>versionRequest</code> to determine the NN's
     * namespace and version info. It automatically retries until
     * the NN responds or the DN is shutting down.
     *
     * @return the NamespaceInfo
     */
    @VisibleForTesting
    NamespaceInfo retrieveNamespaceInfo() throws IOException {
        NamespaceInfo nsInfo = null;
        while (shouldRun()) {
            try {
                //
                nsInfo = bpNamenode.versionRequest();
                LOG.debug(this + " received versionRequest response: " + nsInfo);
                break;
            } catch (SocketTimeoutException e) {  // namenode is busy
                LOG.warn("Problem connecting to server: " + nnAddr);
            } catch (IOException e) {  // namenode is not available
                LOG.warn("Problem connecting to server: " + nnAddr);
            }

            // try again in a second
            sleepAndLogInterrupts(5000, "requesting version info from NN");
        }

        if (nsInfo != null) {
            checkNNVersion(nsInfo);
        } else {
            throw new IOException("DN shut down before block pool connected");
        }
        return nsInfo;
    }

    private void checkNNVersion(NamespaceInfo nsInfo)
            throws IncorrectVersionException {
        // build and layout versions should match
        String nnVersion = nsInfo.getSoftwareVersion();
        String minimumNameNodeVersion = dnConf.getMinimumNameNodeVersion();
        if (VersionUtil.compareVersions(nnVersion, minimumNameNodeVersion) < 0) {
            IncorrectVersionException ive = new IncorrectVersionException(
                    minimumNameNodeVersion, nnVersion, "NameNode", "DataNode");
            LOG.warn(ive.getMessage());
            throw ive;
        }
        String dnVersion = VersionInfo.getVersion();
        if (!nnVersion.equals(dnVersion)) {
            LOG.info("Reported NameNode version '" + nnVersion + "' does not match " +
                    "DataNode version '" + dnVersion + "' but is within acceptable " +
                    "limits. Note: This is normal during a rolling upgrade.");
        }
    }

    private void connectToNNAndHandshake() throws IOException {
        // get NN proxy
        // 连接到namenode上去
        // bpNamenode其实就是一个负责进行rpc接口调用的一个东西,通过这个东西就可以让BPServiceActor以RPC接口调用的方式,去请求namenode
        // bpNamenode就是一个rpc接口调用的代理,在datanode获取一个rpc代理之后,后面就可以通过这个代理跟namenode进行通信
        bpNamenode = dn.connectToNN(nnAddr);

        // First phase of the handshake with NN - get the namespace info.
        // 在这里是通过那个bpNameNode作为一个rpc代理,调用了namenode的一个rpc接口,获取到了namenode的NamespaceInfo
        NamespaceInfo nsInfo = retrieveNamespaceInfo();

        // Verify that this matches the other NN in this HA pair.
        // This also initializes our block pool in the DN if we are
        // the first NN connection for this BP.
        // 第一件事:进行两个NamespaceInfo的校验,检查一下他们两个其实是一样的
        // 第二件事:如果在第一个namenode返回了一个NamespaceInfo之后,一定会在datanode这里初始化dataStorage,里面会初始化
        // 对应的block存储空间,启动一些block相关的后台线程
        bpos.verifyAndSetNamespaceInfo(nsInfo);

        // Second phase of the handshake with the NN.
        //向namenode进行注册
        register();
    }

    // This is useful to make sure NN gets Heartbeat before Blockreport
    // upon NN restart while DN keeps retrying Otherwise,
    // 1. NN restarts.
    // 2. Heartbeat RPC will retry and succeed. NN asks DN to reregister.
    // 3. After reregistration completes, DN will send Blockreport first.
    // 4. Given NN receives Blockreport after Heartbeat, it won't mark
    //    DatanodeStorageInfo#blockContentsStale to false until the next
    //    Blockreport.
    void scheduleHeartbeat() {
        lastHeartbeat = 0;
    }

    /**
     * This methods  arranges for the data node to send the block report at
     * the next heartbeat.
     */
    void scheduleBlockReport(long delay) {
        if (delay > 0) { // send BR after random delay
            lastBlockReport = Time.now()
                    - (dnConf.blockReportInterval - DFSUtil.getRandom().nextInt((int) (delay)));
        } else { // send at next heartbeat
            lastBlockReport = lastHeartbeat - dnConf.blockReportInterval;
        }
        resetBlockReportTime = true; // reset future BRs for randomness
    }

    void reportBadBlocks(ExtendedBlock block,
                         String storageUuid, StorageType storageType) {
        if (bpRegistration == null) {
            return;
        }
        DatanodeInfo[] dnArr = {new DatanodeInfo(bpRegistration)};
        String[] uuids = {storageUuid};
        StorageType[] types = {storageType};
        LocatedBlock[] blocks = {new LocatedBlock(block, dnArr, uuids, types)};

        try {
            bpNamenode.reportBadBlocks(blocks);
        } catch (IOException e) {
            /* One common reason is that NameNode could be in safe mode.
             * Should we keep on retrying in that case?
             */
            LOG.warn("Failed to report bad block " + block + " to namenode : "
                    + " Exception", e);
        }
    }

    /**
     * Report received blocks and delete hints to the Namenode for each
     * storage.
     *
     * @throws IOException
     */
    private void reportReceivedDeletedBlocks() throws IOException {

        // Generate a list of the pending reports for each storage under the lock
        ArrayList<StorageReceivedDeletedBlocks> reports =
                new ArrayList<StorageReceivedDeletedBlocks>(pendingIncrementalBRperStorage.size());
        synchronized (pendingIncrementalBRperStorage) {
            for (Map.Entry<DatanodeStorage, PerStoragePendingIncrementalBR> entry :
                    pendingIncrementalBRperStorage.entrySet()) {
                final DatanodeStorage storage = entry.getKey();
                final PerStoragePendingIncrementalBR perStorageMap = entry.getValue();

                if (perStorageMap.getBlockInfoCount() > 0) {
                    // Send newly-received and deleted blockids to namenode
                    ReceivedDeletedBlockInfo[] rdbi = perStorageMap.dequeueBlockInfos();
                    reports.add(new StorageReceivedDeletedBlocks(storage, rdbi));
                }
            }
            sendImmediateIBR = false;
        }

        if (reports.size() == 0) {
            // Nothing new to report.
            return;
        }

        // Send incremental block reports to the Namenode outside the lock
        boolean success = false;
        try {
            bpNamenode.blockReceivedAndDeleted(bpRegistration,
                    bpos.getBlockPoolId(),
                    reports.toArray(new StorageReceivedDeletedBlocks[reports.size()]));
            success = true;
        } finally {
            if (!success) {
                synchronized (pendingIncrementalBRperStorage) {
                    for (StorageReceivedDeletedBlocks report : reports) {
                        // If we didn't succeed in sending the report, put all of the
                        // blocks back onto our queue, but only in the case where we
                        // didn't put something newer in the meantime.
                        PerStoragePendingIncrementalBR perStorageMap =
                                pendingIncrementalBRperStorage.get(report.getStorage());
                        perStorageMap.putMissingBlockInfos(report.getBlocks());
                        sendImmediateIBR = true;
                    }
                }
            }
        }
    }

    /**
     * @return pending incremental block report for given {@code storage}
     */
    private PerStoragePendingIncrementalBR getIncrementalBRMapForStorage(
            DatanodeStorage storage) {
        PerStoragePendingIncrementalBR mapForStorage =
                pendingIncrementalBRperStorage.get(storage);

        if (mapForStorage == null) {
            // This is the first time we are adding incremental BR state for
            // this storage so create a new map. This is required once per
            // storage, per service actor.
            mapForStorage = new PerStoragePendingIncrementalBR();
            pendingIncrementalBRperStorage.put(storage, mapForStorage);
        }

        return mapForStorage;
    }

    /**
     * Add a blockInfo for notification to NameNode. If another entry
     * exists for the same block it is removed.
     * <p>
     * Caller must synchronize access using pendingIncrementalBRperStorage.
     */
    void addPendingReplicationBlockInfo(ReceivedDeletedBlockInfo bInfo,
                                        DatanodeStorage storage) {
        // Make sure another entry for the same block is first removed.
        // There may only be one such entry.
        for (Map.Entry<DatanodeStorage, PerStoragePendingIncrementalBR> entry :
                pendingIncrementalBRperStorage.entrySet()) {
            if (entry.getValue().removeBlockInfo(bInfo)) {
                break;
            }
        }
        getIncrementalBRMapForStorage(storage).putBlockInfo(bInfo);
    }

    /*
     * Informing the name node could take a long long time! Should we wait
     * till namenode is informed before responding with success to the
     * client? For now we don't.
     *
     * 在这里,这个其实是一个异步通知的过程,不会因为要通知namenode自己接收一个block而等待很长的时间
     * 直接返回,然后就发送ack响应消息给上游的datanode,一直到通知那个hdfs客户端
     *
     */
    void notifyNamenodeBlock(ReceivedDeletedBlockInfo bInfo,
                             String storageUuid, boolean now) {
        synchronized (pendingIncrementalBRperStorage) {
            addPendingReplicationBlockInfo(
                    bInfo, dn.getFSDataset().getStorage(storageUuid));
            sendImmediateIBR = true;
            // If now is true, the report is sent right away.
            // Otherwise, it will be sent out in the next heartbeat.
            if (now) {
                pendingIncrementalBRperStorage.notifyAll();
            }
        }
    }

    void notifyNamenodeDeletedBlock(
            ReceivedDeletedBlockInfo bInfo, String storageUuid) {
        synchronized (pendingIncrementalBRperStorage) {
            addPendingReplicationBlockInfo(
                    bInfo, dn.getFSDataset().getStorage(storageUuid));
        }
    }

    /**
     * Run an immediate block report on this thread. Used by tests.
     */
    @VisibleForTesting
    void triggerBlockReportForTests() {
        synchronized (pendingIncrementalBRperStorage) {
            lastBlockReport = 0;
            lastHeartbeat = 0;
            pendingIncrementalBRperStorage.notifyAll();
            while (lastBlockReport == 0) {
                try {
                    pendingIncrementalBRperStorage.wait(100);
                } catch (InterruptedException e) {
                    return;
                }
            }
        }
    }

    @VisibleForTesting
    void triggerHeartbeatForTests() {
        synchronized (pendingIncrementalBRperStorage) {
            lastHeartbeat = 0;
            pendingIncrementalBRperStorage.notifyAll();
            while (lastHeartbeat == 0) {
                try {
                    pendingIncrementalBRperStorage.wait(100);
                } catch (InterruptedException e) {
                    return;
                }
            }
        }
    }

    @VisibleForTesting
    void triggerDeletionReportForTests() {
        synchronized (pendingIncrementalBRperStorage) {
            lastDeletedReport = 0;
            pendingIncrementalBRperStorage.notifyAll();

            while (lastDeletedReport == 0) {
                try {
                    pendingIncrementalBRperStorage.wait(100);
                } catch (InterruptedException e) {
                    return;
                }
            }
        }
    }

    @VisibleForTesting
    boolean hasPendingIBR() {
        return sendImmediateIBR;
    }

    private long prevBlockReportId = 0;

    private long generateUniqueBlockReportId() {
        long id = System.nanoTime();
        if (id <= prevBlockReportId) {
            id = prevBlockReportId + 1;
        }
        prevBlockReportId = id;
        return id;
    }

    /**
     * Report the list blocks to the Namenode
     *
     * @return DatanodeCommands returned by the NN. May be null.
     * @throws IOException
     */
    List<DatanodeCommand> blockReport() throws IOException {
        // send block report if timer has expired.
        final long startTime = now();
        // 默认情况6小时汇报一次
        if (startTime - lastBlockReport <= dnConf.blockReportInterval) {
            return null;
        }

        final ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();

        // Flush any block information that precedes the block report. Otherwise
        // we have a chance that we will miss the delHint information
        // or we will report an RBW replica after the BlockReport already reports
        // a FINALIZED one.
        reportReceivedDeletedBlocks();
        lastDeletedReport = startTime;

        long brCreateStartTime = now();
        Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
                dn.getFSDataset().getBlockReports(bpos.getBlockPoolId());

        // Convert the reports to the format expected by the NN.
        int i = 0;
        int totalBlockCount = 0;
        StorageBlockReport reports[] =
                new StorageBlockReport[perVolumeBlockLists.size()];

        for (Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
            BlockListAsLongs blockList = kvPair.getValue();
            reports[i++] = new StorageBlockReport(
                    kvPair.getKey(), blockList.getBlockListAsLongs());
            totalBlockCount += blockList.getNumberOfBlocks();
        }

        // Send the reports to the NN.
        int numReportsSent = 0;
        int numRPCs = 0;
        boolean success = false;
        long brSendStartTime = now();
        long reportId = generateUniqueBlockReportId();
        try {
            if (totalBlockCount < dnConf.blockReportSplitThreshold) {
                // Below split threshold, send all reports in a single message.
                DatanodeCommand cmd = bpNamenode.blockReport(
                        bpRegistration, bpos.getBlockPoolId(), reports,
                        new BlockReportContext(1, 0, reportId));
                numRPCs = 1;
                numReportsSent = reports.length;
                if (cmd != null) {
                    cmds.add(cmd);
                }
            } else {
                // Send one block report per message.
                for (int r = 0; r < reports.length; r++) {
                    StorageBlockReport singleReport[] = {reports[r]};
                    DatanodeCommand cmd = bpNamenode.blockReport(
                            bpRegistration, bpos.getBlockPoolId(), singleReport,
                            new BlockReportContext(reports.length, r, reportId));
                    numReportsSent++;
                    numRPCs++;
                    if (cmd != null) {
                        cmds.add(cmd);
                    }
                }
            }
            success = true;
        } finally {
            // Log the block report processing stats from Datanode perspective
            long brSendCost = now() - brSendStartTime;
            long brCreateCost = brSendStartTime - brCreateStartTime;
            dn.getMetrics().addBlockReport(brSendCost);
            final int nCmds = cmds.size();
            LOG.info((success ? "S" : "Uns") +
                    "uccessfully sent block report 0x" +
                    Long.toHexString(reportId) + ",  containing " + reports.length +
                    " storage report(s), of which we sent " + numReportsSent + "." +
                    " The reports had " + totalBlockCount +
                    " total blocks and used " + numRPCs +
                    " RPC(s). This took " + brCreateCost +
                    " msec to generate and " + brSendCost +
                    " msecs for RPC and NN processing." +
                    " Got back " +
                    ((nCmds == 0) ? "no commands" :
                            ((nCmds == 1) ? "one command: " + cmds.get(0) :
                                    (nCmds + " commands: " + Joiner.on("; ").join(cmds)))) +
                    ".");
        }
        scheduleNextBlockReport(startTime);
        return cmds.size() == 0 ? null : cmds;
    }

    private void scheduleNextBlockReport(long previousReportStartTime) {
        // If we have sent the first set of block reports, then wait a random
        // time before we start the periodic block reports.
        if (resetBlockReportTime) {
            lastBlockReport = previousReportStartTime -
                    DFSUtil.getRandom().nextInt((int) (dnConf.blockReportInterval));
            resetBlockReportTime = false;
        } else {
            /* say the last block report was at 8:20:14. The current report
             * should have started around 9:20:14 (default 1 hour interval).
             * If current time is :
             *   1) normal like 9:20:18, next report should be at 10:20:14
             *   2) unexpected like 11:35:43, next report should be at 12:20:14
             */
            lastBlockReport += (now() - lastBlockReport) /
                    dnConf.blockReportInterval * dnConf.blockReportInterval;
        }
    }

    DatanodeCommand cacheReport() throws IOException {
        // If caching is disabled, do not send a cache report
        if (dn.getFSDataset().getCacheCapacity() == 0) {
            return null;
        }
        // send cache report if timer has expired.
        DatanodeCommand cmd = null;
        final long startTime = Time.monotonicNow();
        if (startTime - lastCacheReport > dnConf.cacheReportInterval) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Sending cacheReport from service actor: " + this);
            }
            lastCacheReport = startTime;

            String bpid = bpos.getBlockPoolId();
            List<Long> blockIds = dn.getFSDataset().getCacheReport(bpid);
            long createTime = Time.monotonicNow();

            cmd = bpNamenode.cacheReport(bpRegistration, bpid, blockIds);
            long sendTime = Time.monotonicNow();
            long createCost = createTime - startTime;
            long sendCost = sendTime - createTime;
            dn.getMetrics().addCacheReport(sendCost);
            LOG.debug("CacheReport of " + blockIds.size()
                    + " block(s) took " + createCost + " msec to generate and "
                    + sendCost + " msecs for RPC and NN processing");
        }
        return cmd;
    }

    HeartbeatResponse sendHeartBeat() throws IOException {
        // 带上当前的存储信息
        StorageReport[] reports =
                dn.getFSDataset().getStorageReports(bpos.getBlockPoolId());
        if (LOG.isDebugEnabled()) {
            LOG.debug("Sending heartbeat with " + reports.length +
                    " storage reports from service actor: " + this);
        }

        return bpNamenode.sendHeartbeat(bpRegistration,
                reports,
                dn.getFSDataset().getCacheCapacity(),
                dn.getFSDataset().getCacheUsed(),
                dn.getXmitsInProgress(),
                dn.getXceiverCount(),
                dn.getFSDataset().getNumFailedVolumes());
    }

    //This must be called only by BPOfferService
    void start() {
        if ((bpThread != null) && (bpThread.isAlive())) {
            //Thread is started already
            return;
        }
        bpThread = new Thread(this, formatThreadName());
        bpThread.setDaemon(true); // needed for JUnit testing
        bpThread.start();
    }

    private String formatThreadName() {
        Collection<StorageLocation> dataDirs =
                DataNode.getStorageLocations(dn.getConf());
        return "DataNode: [" + dataDirs.toString() + "] " +
                " heartbeating to " + nnAddr;
    }

    //This must be called only by blockPoolManager.
    void stop() {
        shouldServiceRun = false;
        if (bpThread != null) {
            bpThread.interrupt();
        }
    }

    //This must be called only by blockPoolManager
    void join() {
        try {
            if (bpThread != null) {
                bpThread.join();
            }
        } catch (InterruptedException ie) {
        }
    }

    //Cleanup method to be called by current thread before exiting.
    private synchronized void cleanUp() {

        shouldServiceRun = false;
        IOUtils.cleanup(LOG, bpNamenode);
        bpos.shutdownActor(this);
    }

    private void handleRollingUpgradeStatus(HeartbeatResponse resp) throws IOException {
        RollingUpgradeStatus rollingUpgradeStatus = resp.getRollingUpdateStatus();
        if (rollingUpgradeStatus != null &&
                rollingUpgradeStatus.getBlockPoolId().compareTo(bpos.getBlockPoolId()) != 0) {
            // Can this ever occur?
            LOG.error("Invalid BlockPoolId " +
                    rollingUpgradeStatus.getBlockPoolId() +
                    " in HeartbeatResponse. Expected " +
                    bpos.getBlockPoolId());
        } else {
            bpos.signalRollingUpgrade(rollingUpgradeStatus != null);
        }
    }

    /**
     * Main loop for each BP thread. Run until shutdown,
     * forever calling remote NameNode functions.
     */
    private void offerService() throws Exception {
        LOG.info("For namenode " + nnAddr + " using"
                + " DELETEREPORT_INTERVAL of " + dnConf.deleteReportInterval + " msec "
                + " BLOCKREPORT_INTERVAL of " + dnConf.blockReportInterval + "msec"
                + " CACHEREPORT_INTERVAL of " + dnConf.cacheReportInterval + "msec"
                + " Initial delay: " + dnConf.initialBlockReportDelay + "msec"
                + "; heartBeatInterval=" + dnConf.heartBeatInterval);

        //
        // Now loop for a long time....
        //
        while (shouldRun()) {
            try {
                final long startTime = now();

                //
                // Every so often, send heartbeat or block-report
                //
                // 当前时间-上次发送心跳时间 > 心跳间隔时间  就可以发送心跳了,默认3秒发送一次心跳
                if (startTime - lastHeartbeat >= dnConf.heartBeatInterval) {
                    //
                    // All heartbeat messages include following info:
                    // -- Datanode name
                    // -- data transfer port
                    // -- Total capacity
                    // -- Bytes remaining
                    //
                    lastHeartbeat = startTime;
                    if (!dn.areHeartbeatsDisabledForTests()) {
                        // sendHeartBeat()发送心跳
                        HeartbeatResponse resp = sendHeartBeat();
                        assert resp != null;
                        dn.getMetrics().addHeartbeat(now() - startTime);

                        // If the state of this NN has changed (eg STANDBY->ACTIVE)
                        // then let the BPOfferService update itself.
                        //
                        // Important that this happens before processCommand below,
                        // since the first heartbeat to a new active might have commands
                        // that we should actually process.

                        // 这边的话是根据namenode返回的状态(active还是standby),更新当前BPServiceActor的状态,一组namenode
                        // 是两个namenode,一个active,一个standby,每个namenode都对应一个BPServiceActor,保存自己对应的namenode
                        // 状态,发送心跳的时候,如果说感知到namenode的状态发生了切换,active->standby,BPServiceActor也是要更新自己
                        // 的状态的
                        bpos.updateActorStatesFromHeartbeat(
                                this, resp.getNameNodeHaState());
                        state = resp.getNameNodeHaState().getState();

                        if (state == HAServiceState.ACTIVE) {
                            handleRollingUpgradeStatus(resp);
                        }

                        long startProcessCommands = now();
                        if (!processCommand(resp.getCommands()))
                            continue;
                        long endProcessCommands = now();
                        if (endProcessCommands - startProcessCommands > 2000) {
                            LOG.info("Took " + (endProcessCommands - startProcessCommands)
                                    + "ms to process " + resp.getCommands().length
                                    + " commands from NN");
                        }
                    }
                }

                // 这边也是在汇报deleted blocks
                if (sendImmediateIBR ||
                        (startTime - lastDeletedReport > dnConf.deleteReportInterval)) {
                    // 其实在Block被datanode接收到以后,不是同步进行通知namenode
                    // 先会加入一个队列里面
                    // 其实就是在BPServiceActor线程,他默认每隔5分钟将最近接收到或者删除掉的block,一次性批量上报给namenode
                    reportReceivedDeletedBlocks();
                    lastDeletedReport = startTime;
                }

                // 进行block 汇报,默认间隔6小时,全量上报
                List<DatanodeCommand> cmds = blockReport();
                // 处理namenode下发的指令
                processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));

                //默认间隔10秒
                DatanodeCommand cmd = cacheReport();
                processCommand(new DatanodeCommand[]{cmd});

                // Now safe to start scanning the block pool.
                // If it has already been started, this is a no-op.
                if (dn.blockScanner != null) {
                    dn.blockScanner.addBlockPool(bpos.getBlockPoolId());
                }

                //
                // There is no work to do;  sleep until hearbeat timer elapses,
                // or work arrives, and then iterate again.
                //
                long waitTime = dnConf.heartBeatInterval -
                        (Time.now() - lastHeartbeat);
                synchronized (pendingIncrementalBRperStorage) {
                    if (waitTime > 0 && !sendImmediateIBR) {
                        try {
                            pendingIncrementalBRperStorage.wait(waitTime);
                        } catch (InterruptedException ie) {
                            LOG.warn("BPOfferService for " + this + " interrupted");
                        }
                    }
                } // synchronized
            } catch (RemoteException re) {
                String reClass = re.getClassName();
                if (UnregisteredNodeException.class.getName().equals(reClass) ||
                        DisallowedDatanodeException.class.getName().equals(reClass) ||
                        IncorrectVersionException.class.getName().equals(reClass)) {
                    LOG.warn(this + " is shutting down", re);
                    shouldServiceRun = false;
                    return;
                }
                LOG.warn("RemoteException in offerService", re);
                try {
                    long sleepTime = Math.min(1000, dnConf.heartBeatInterval);
                    Thread.sleep(sleepTime);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                }
            } catch (IOException e) {
                LOG.warn("IOException in offerService", e);
            }
        } // while (shouldRun())
    } // offerService

    /**
     * Register one bp with the corresponding NameNode
     * <p>
     * The bpDatanode needs to register with the namenode on startup in order
     * 1) to report which storage it is serving now and
     * 2) to receive a registrationID
     * <p>
     * issued by the namenode to recognize registered datanodes.
     *
     * @throws IOException
     * @see FSNamesystem#registerDatanode(DatanodeRegistration)
     */
    void register() throws IOException {
        // The handshake() phase loaded the block pool storage
        // off disk - so update the bpRegistration object from that info
        // 创建了一个DatanodeRegistration对象,大体上来说相当于是一个datanode注册请求
        // 这里面包含了DatanodeId,这样的话,如果namenode收到这个注册的请求,name就可以识别
        // 出来是哪个datanode发起的注册
        bpRegistration = bpos.createRegistration();

        LOG.info(this + " beginning handshake with NN");

        while (shouldRun()) {
            try {
                // Use returned registration from namenode with updated fields
                bpRegistration = bpNamenode.registerDatanode(bpRegistration);
                break;
            } catch (EOFException e) {  // namenode might have just restarted
                LOG.info("Problem connecting to server: " + nnAddr + " :"
                        + e.getLocalizedMessage());
                sleepAndLogInterrupts(1000, "connecting to server");
            } catch (SocketTimeoutException e) {  // namenode is busy
                LOG.info("Problem connecting to server: " + nnAddr);
                sleepAndLogInterrupts(1000, "connecting to server");
            }
        }

        LOG.info("Block pool " + this + " successfully registered with NN");
        bpos.registrationSucceeded(this, bpRegistration);

        // random short delay - helps scatter the BR from all DNs
        // 延迟调度一次block report,他其实在这里就会执行一次block report,去汇报一次当前datanode节点上全量block有哪些
        // 这边会做一些时间的设定,然后在后面,就会在BPServiceActor.run()方法中,线程的主流程里面就开始进行block report
        // 第一次是全量上报自己的block
        scheduleBlockReport(dnConf.initialBlockReportDelay);
    }


    private void sleepAndLogInterrupts(int millis,
                                       String stateString) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException ie) {
            LOG.info("BPOfferService " + this + " interrupted while " + stateString);
        }
    }

    /**
     * No matter what kind of exception we get, keep retrying to offerService().
     * That's the loop that connects to the NameNode and provides basic DataNode
     * functionality.
     * <p>
     * Only stop when "shouldRun" or "shouldServiceRun" is turned off, which can
     * happen either at shutdown or due to refreshNamenodes.
     */
    // 这个就是BPserviceActor线程的核心工作方法
    // datanode向namenode心跳,block汇报都在这里
    // BP = block Pool
    @Override
    public void run() {
        LOG.info(this + " starting to offer service");

        try {
            while (true) {
                // init stuff
                try {
                    // setup storage
                    // 连接到namenode以及进行握手,其实就是datanode第一次启动注册的过程
                    connectToNNAndHandshake();
                    break;
                } catch (IOException ioe) {
                    // Initial handshake, storage recovery or registration failed
                    runningState = RunningState.INIT_FAILED;
                    if (shouldRetryInit()) {
                        // Retry until all namenode's of BPOS failed initialization
                        LOG.error("Initialization failed for " + this + " "
                                + ioe.getLocalizedMessage());
                        sleepAndLogInterrupts(5000, "initializing");
                    } else {
                        runningState = RunningState.FAILED;
                        LOG.fatal("Initialization failed for " + this + ". Exiting. ", ioe);
                        return;
                    }
                }
            }

            runningState = RunningState.RUNNING;

            while (shouldRun()) {
                try {
                    // 每隔几秒钟发送心跳
                    // 每隔几秒发送block report
                    offerService();
                } catch (Exception ex) {
                    LOG.error("Exception in BPOfferService for " + this, ex);
                    sleepAndLogInterrupts(5000, "offering service");
                }
            }
            runningState = RunningState.EXITED;
        } catch (Throwable ex) {
            LOG.warn("Unexpected exception in block pool " + this, ex);
            runningState = RunningState.FAILED;
        } finally {
            LOG.warn("Ending block pool service for: " + this);
            cleanUp();
        }
    }

    private boolean shouldRetryInit() {
        return shouldRun() && bpos.shouldRetryInit();
    }

    private boolean shouldRun() {
        return shouldServiceRun && dn.shouldRun();
    }

    /**
     * Process an array of datanode commands
     *
     * @param cmds an array of datanode commands
     * @return true if further processing may be required or false otherwise.
     */
    boolean processCommand(DatanodeCommand[] cmds) {
        if (cmds != null) {
            for (DatanodeCommand cmd : cmds) {
                try {
                    if (bpos.processCommandFromActor(cmd, this) == false) {
                        return false;
                    }
                } catch (IOException ioe) {
                    LOG.warn("Error processing datanode Command", ioe);
                }
            }
        }
        return true;
    }

    void trySendErrorReport(int errCode, String errMsg) {
        try {
            bpNamenode.errorReport(bpRegistration, errCode, errMsg);
        } catch (IOException e) {
            LOG.warn("Error reporting an error to NameNode " + nnAddr,
                    e);
        }
    }

    /**
     * Report a bad block from another DN in this cluster.
     */
    void reportRemoteBadBlock(DatanodeInfo dnInfo, ExtendedBlock block)
            throws IOException {
        LocatedBlock lb = new LocatedBlock(block,
                new DatanodeInfo[]{dnInfo});
        bpNamenode.reportBadBlocks(new LocatedBlock[]{lb});
    }

    void reRegister() throws IOException {
        if (shouldRun()) {
            // re-retrieve namespace info to make sure that, if the NN
            // was restarted, we still match its version (HDFS-2120)
            retrieveNamespaceInfo();
            // and re-register
            register();
            scheduleHeartbeat();
        }
    }

    private static class PerStoragePendingIncrementalBR {
        private final Map<Long, ReceivedDeletedBlockInfo> pendingIncrementalBR =
                Maps.newHashMap();

        /**
         * Return the number of blocks on this storage that have pending
         * incremental block reports.
         *
         * @return
         */
        int getBlockInfoCount() {
            return pendingIncrementalBR.size();
        }

        /**
         * Dequeue and return all pending incremental block report state.
         *
         * @return
         */
        ReceivedDeletedBlockInfo[] dequeueBlockInfos() {
            ReceivedDeletedBlockInfo[] blockInfos =
                    pendingIncrementalBR.values().toArray(
                            new ReceivedDeletedBlockInfo[getBlockInfoCount()]);

            pendingIncrementalBR.clear();
            return blockInfos;
        }

        /**
         * Add blocks from blockArray to pendingIncrementalBR, unless the
         * block already exists in pendingIncrementalBR.
         *
         * @param blockArray list of blocks to add.
         * @return the number of missing blocks that we added.
         */
        int putMissingBlockInfos(ReceivedDeletedBlockInfo[] blockArray) {
            int blocksPut = 0;
            for (ReceivedDeletedBlockInfo rdbi : blockArray) {
                if (!pendingIncrementalBR.containsKey(rdbi.getBlock().getBlockId())) {
                    pendingIncrementalBR.put(rdbi.getBlock().getBlockId(), rdbi);
                    ++blocksPut;
                }
            }
            return blocksPut;
        }

        /**
         * Add pending incremental block report for a single block.
         *
         * @param blockInfo
         */
        void putBlockInfo(ReceivedDeletedBlockInfo blockInfo) {
            pendingIncrementalBR.put(blockInfo.getBlock().getBlockId(), blockInfo);
        }

        /**
         * Remove pending incremental block report for a single block if it
         * exists.
         *
         * @param blockInfo
         * @return true if a report was removed, false if no report existed for
         * the given block.
         */
        boolean removeBlockInfo(ReceivedDeletedBlockInfo blockInfo) {
            return (pendingIncrementalBR.remove(blockInfo.getBlock().getBlockId()) != null);
        }
    }

    void triggerBlockReport(BlockReportOptions options) throws IOException {
        if (options.isIncremental()) {
            LOG.info(bpos.toString() + ": scheduling an incremental block report.");
            synchronized (pendingIncrementalBRperStorage) {
                sendImmediateIBR = true;
                pendingIncrementalBRperStorage.notifyAll();
            }
        } else {
            LOG.info(bpos.toString() + ": scheduling a full block report.");
            synchronized (pendingIncrementalBRperStorage) {
                lastBlockReport = 0;
                pendingIncrementalBRperStorage.notifyAll();
            }
        }
    }
}
